// Copyright (c) 2022 by Duguang.IO Inc. All Rights Reserved.
// Author: Ethan Liu
// Date: 2022-05-21 12:47:52

package runtime

import (
	"bytes"
	"context"
	"encoding/json"
	"jianmu-worker-docker/client"
	"jianmu-worker-docker/engine"
	"jianmu-worker-docker/livelog"
	"time"

	"github.com/hashicorp/go-multierror"

	logger "jianmu-worker-docker/logging"
)

type Worker struct {
	// Client 负责与CI Server交互的客户端
	Client client.Client
	// Engine Docker客户端
	Engine *engine.Docker
	// Worker ID
	ID string
	// Worker Name
	Name string
	// Worker Type
	Type   string
	Config *ContainerConfig
}

// ContainerConfig 容器配置
type ContainerConfig struct {
	// cpu核心数限制 最小0.01
	Cpus float64
	// 内存限制 单位:bytes
	Memory int64
	// 网络模型 bridge host none default or自定义
	NetworkMode string
	// 容器运行最大超时时间，单位为秒
	Timeout int
}

// ApplyWorkerConfig 把worker对容器的配置应用在Runner上
func (w *Worker) ApplyWorkerConfig(runner *engine.Runner) *engine.Runner {
	// 资源限制
	if w.Config == nil {
		return runner
	}

	runner.Spec.CPUS = w.Config.Cpus
	runner.Spec.MemLimit = w.Config.Memory
	runner.Spec.NetworkMode = w.Config.NetworkMode

	// 当任务未携带Timeout值时，使用worker配置的值
	if runner.Timeout == 0 {
		runner.Timeout = w.Config.Timeout
	}

	return runner
}

// Run 运行Runner
func (w *Worker) Run(ctx context.Context, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)

	err := w.Client.Accept(ctx, runner)
	if err != nil && err == client.ErrOptimisticLock {
		log.Debug("runner accepted by another worker")
		return nil
	}
	if err != nil {
		log.WithError(err).Error("cannot accept runner")
		return err
	}

	// 应用worker配置
	w.ApplyWorkerConfig(runner)

	switch runner.Type {
	case "TASK":
		if err := w.run(ctx, runner); err != nil {
			return err
		}
	case "VOLUME":
		if err := w.doVolume(ctx, runner); err != nil {
			return err
		}
	default:
		log.Debugln("wrong format runner")
		return nil
	}
	return nil
}

// Resume 恢复Runner
func (w *Worker) Resume(ctx context.Context, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)
	switch runner.Type {
	case "TASK":
		if err := w.resume(ctx, runner); err != nil {
			return err
		}
	default:
		log.Debugln("wrong format runner")
		return nil
	}
	return nil
}

func (w *Worker) resume(ctx context.Context, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)

	ctxdone, cancel := context.WithCancel(ctx)
	defer cancel()

	// 监控服务器任务是否取消
	go func() {
		done, err := w.Client.Watch(ctxdone, runner.ID)
		if err != nil {
			log.Debugf("监听任务失败： %v", err)
		}
		if done {
			task := &engine.Task{
				TaskId:   runner.ID,
				Status:   engine.Failed,
				ExitCode: -1,
			}
			if err := w.Client.Update(ctx, task); err != nil {
				log.WithError(err).Error("cannot update task status")
			}
			if err := w.Engine.Clean(ctx, runner); err != nil {
				log.WithError(err).Error("cannot clean task container")
			}
			cancel()
			log.WithField("runner.id", runner.ID).Debugln("收到任务取消通知")
		} else {
			log.Debugln("done listening for cancellations")
		}
	}()

	var result error

	livelog := livelog.New(w.Client, w.ID, runner.ID)
	wc := newReplacer(livelog, runner.Spec.Secrets)
	exited, err := w.Engine.Resume(ctx, runner, wc)

	if err != nil {
		livelog.Write([]byte(err.Error()))
	}

	// 关闭日志流
	if err := livelog.Close(); err != nil {
		result = multierror.Append(result, err)
	}

	task := &engine.Task{
		TaskId:   runner.ID,
		ExitCode: 0,
	}

	// 如果context取消则任务取消
	switch ctx.Err() {
	case context.Canceled, context.DeadlineExceeded:
		log.Debugln("任务取消")
		task.Status = engine.Failed
		task.ExitCode = -2
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctx, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
		return nil
	}

	if exited != nil {
		if exited.OOMKilled {
			log.Debugln("received oom kill.")
			task.Status = engine.Failed
			task.ExitCode = 137
		} else {
			log.Debugf("received exit code %d", exited.ExitCode)
			task.ExitCode = exited.ExitCode
		}
		if exited.ExitCode == 0 {
			task.Status = engine.Succeed
		} else {
			task.Status = engine.Failed
		}
		task.ResultFile = exited.ResultFile
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctx, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
		return result
	}

	if err != nil {
		task := &engine.Task{
			TaskId:   runner.ID,
			Status:   engine.Failed,
			ExitCode: -1,
			ErrorMsg: err.Error(),
		}
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctx, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
	}

	switch err {
	case context.Canceled, context.DeadlineExceeded:
		log.Debugln("任务取消")
		task.Status = engine.Failed
		task.ExitCode = -2
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctx, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
		return nil
	}

	return result
}

func (w *Worker) doVolume(ctx context.Context, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)
	var err error
	switch runner.Volume.Type {
	case "CREATION":
		err = w.Engine.CreateVolume(ctx, runner.Volume.Name)
		if err != nil {
			log.WithError(err).Error("cannot create volume")
		} else {
			log.WithField("Volume", runner.Volume.Name).Debugln("创建成功")
		}
	case "DELETION":
		err = w.Engine.DeleteVolume(ctx, runner.Volume.Name)
		if err != nil {
			log.WithError(err).Error("cannot delete volume")
		} else {
			log.WithField("Volume", runner.Volume.Name).Debugln("删除成功")
		}
	}
	task := &engine.Task{
		TaskId: runner.ID,
		Status: engine.Succeed,
	}
	if err != nil {
		task.Status = engine.Failed
		task.ExitCode = 255
		task.ErrorMsg = err.Error()
	}
	if err := w.Client.Update(ctx, task); err != nil {
		log.WithError(err).Error("cannot update task status")
		return err
	}
	return nil
}

func (w *Worker) run(ctx context.Context, runner *engine.Runner) error {
	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)

	ctxdone, cancel := context.WithCancel(ctx)
	defer cancel()

	timeout := time.Duration(runner.Timeout) * time.Second
	ctxtimeout, cancel := context.WithTimeout(ctxdone, timeout)
	defer cancel()

	ctxcancel, cancel := context.WithCancel(ctxtimeout)
	defer cancel()

	// 监控服务器任务是否取消
	go func() {
		done, _ := w.Client.Watch(ctxdone, runner.ID)
		if done {
			task := &engine.Task{
				TaskId:   runner.ID,
				Status:   engine.Failed,
				ExitCode: -1,
			}
			if err := w.Client.Update(ctx, task); err != nil {
				log.WithError(err).Error("cannot update task status")
			}
			if err := w.Engine.Clean(ctx, runner); err != nil {
				log.WithError(err).Error("cannot clean task container")
			}
			cancel()
			log.WithField("runner.id", runner.ID).Debugln("收到任务取消通知")
		} else {
			log.Debugln("done listening for cancellations")
		}
	}()

	task := &engine.Task{
		TaskId: runner.ID,
		Status: engine.Running,
	}
	if err := w.Client.Update(ctx, task); err != nil {
		log.WithError(err).Error("cannot update task status")
		return err
	}

	log.Debugln("updated task to running")
	s, e := json.Marshal(runner)
	if e != nil {
		log.WithError(e).Errorln("序列化错误")
	}
	log.Debugln(string(s))

	ctxlogger := logger.WithLogger(ctxcancel, log)
	return w.exec(ctxlogger, runner)
}

func (w *Worker) exec(ctx context.Context, runner *engine.Runner) error {
	var result error

	log := logger.FromContext(ctx).
		WithField("runner.id", runner.ID)
	var bt bytes.Buffer
	bt.WriteString("当前任务运行于Jianmu-worker ")
	bt.WriteString(w.Name)
	bt.WriteString("\n")
	bt.WriteString("worker类型为: ")
	bt.WriteString(w.Type)
	bt.WriteString("\n")
	bt.WriteString(" worker ID为: ")
	bt.WriteString(w.ID)
	bt.WriteString("\n")
	bt.WriteString("当前任务使用镜像为: ")
	bt.WriteString(runner.Spec.Image)
	bt.WriteString("\n")
	bt.WriteString("镜像下载策略为: ")
	bt.WriteString(runner.Pull.String())
	bt.WriteString("\n")

	livelog := livelog.New(w.Client, w.ID, runner.ID)
	livelog.Write(bt.Bytes())
	wc := newReplacer(livelog, runner.Spec.Secrets)
	exited, err := w.Engine.Run(ctx, runner, wc)

	if err != nil {
		livelog.Write([]byte(err.Error()))
	}

	// 关闭日志流
	if err := livelog.Close(); err != nil {
		result = multierror.Append(result, err)
	}

	task := &engine.Task{
		TaskId:   runner.ID,
		ExitCode: 0,
	}

	// 如果context取消则任务取消
	switch ctx.Err() {
	case context.Canceled, context.DeadlineExceeded:
		log.Debugln("任务取消")
		task.Status = engine.Failed
		task.ExitCode = -2
		ctxnew := context.Background()
		if err := w.Client.Update(ctxnew, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctxnew, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
		return nil
	}

	if exited != nil {
		if exited.OOMKilled {
			log.Debugln("received oom kill.")
			task.Status = engine.Failed
			task.ExitCode = 137
		} else {
			log.Debugf("received exit code %d", exited.ExitCode)
			task.ExitCode = exited.ExitCode
		}
		if exited.ExitCode == 0 {
			task.Status = engine.Succeed
		} else {
			task.Status = engine.Failed
		}
		task.ResultFile = exited.ResultFile
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctx, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
		return result
	}

	if err != nil {
		task := &engine.Task{
			TaskId:   runner.ID,
			Status:   engine.Failed,
			ExitCode: -1,
			ErrorMsg: err.Error(),
		}
		if err := w.Client.Update(ctx, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctx, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
	}

	switch err {
	case context.Canceled, context.DeadlineExceeded:
		log.Debugln("任务取消")
		task.Status = engine.Failed
		task.ExitCode = -2
		ctxnew := context.Background()
		if err := w.Client.Update(ctxnew, task); err != nil {
			log.WithError(err).Error("cannot update task status")
			return err
		}
		if err := w.Engine.Clean(ctxnew, runner); err != nil {
			log.WithError(err).Error("cannot clean task container")
			return err
		}
		return nil
	}

	return result
}
